Thread: Key-Value in shared memory

  1. #1
    Registered User
    Join Date
    May 2015
    Posts
    6

    Key-Value in shared memory

    Hi everybody!

    I'm having some troubles with shared memory. What I want to achieve is to have a key-value storage (both key and value are strings) in shared memory (because of multiple processes accessing to the storage). What I do is the following:

    Code:
    char **kvStore;
    
    [...]
    
    shmem_kv = shmget(SHMEM_KV, sizeof(char *) * 200 * 2, IPC_CREAT | 0666);
    
    //Attach to newly created memory
    kvStore = shmat(shmem_kv, NULL, 0);
    if (kvStore == (char *)(-1))
    {
        fprintf(stderr, "Error allocating shared memory for kvStore\n");
    }
    
    for(int i = 0; i < 200 * 2; i++)
    {
        shmem_entry = shmget(SHMEM_ENTRIES + i, sizeof(char) * 100, IPC_CREAT | 0666);
        kvStore[i] = shmat(shmem_entry, NULL, 0);
    
        if (kvStore[i] == (char *)(-1))
        {
            fprintf(stderr, "Error allocating shared memory for kvStore entry\n");
        }
    }
    Ideally, kvStore is the representation of such k-v storage. Instead of having a "real" matrix, I decided to put everything on a single array, then the first 200 entries are for keys, and the last 200 are for values (e.g. key position 5 => value position = 200 + 5). Am I doing something wrong in the declaration of the share memory? because when I decide to store stuff and, in a second moment, read it I am not able to read it anymore...

  2. #2
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    39,661
    > because when I decide to store stuff and, in a second moment, read it I am not able to read it anymore...
    You should post these programs in their entirety.
    Short, Self Contained, Correct Example
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  3. #3
    Registered User
    Join Date
    May 2015
    Posts
    6
    Ok, here you are the whole program (it's not compilable to keep it shot):
    Code:
    #define KV_LENGTH 100
    #define SHMEM_ENTRIES 12345000
    #define READ_REQUEST "[R]"
    #define WRITE_REQUEST "[W]"
    
    char **kvStore;
    
    void attachToSharedMemory()
    {
        for(int i = 0; i < 400; i++)
        {
            if((shmem_entry = shmget(SHMEM_ENTRIES + i, sizeof(char) * KV_LENGTH, 0666)) < 0)
            {
                fprintf(stderr,"Error locating kvStore entry segment\n");
                exit(1);
            }
    
            if ((kvStore[i] = shmat(shmem_entry, NULL, 0)) < 0)
            {
                fprintf(stderr, "Error attaching kvStore shared memory\n");
                exit(1);
            }
        }
    }
    
    void handleRequest(char* buffer)
    {
        char request[4];
        extractRequestType(buffer, request);
        
        attachToSharedMemory();
    
        if(strcmp(request, READ_REQUEST) == 0)
        {
            int keyLocation = lookupKey(extractKeyFromBuffer(buffer));
    
            if(keyLocation != -1)
            {
                sendMessage(kvStore[keyLocation + 200]);
            }
            else
            {
                exit(1);
            }
        }
        else
        {
            char key[KV_LENGTH], value[KV_LENGTH];
            extractkey(buffer, key);
            extractValue(buffer, value);
    
            int keyLocation = lookupKey(key);
    
            if(keyLocation != -1)
            {
                //Key does not exist yet
                int emptySpace = lookupEmptySpace();
    
                kvStore[emptySpace] = key;
                kvStore[emptySpace + 200] = value;
            }
            else
            {
                //Key already exists, update value
                kvStore[keyLocation + 200] = value;
            }
    
            sendMessage("Done");
        }
    }
    
    void networking()
    {
        char buffer[512];
    
        if(listenForMessages(buffer))  //blocking call
        {
            int myPid = getpid();
    
            fork();
    
            if(getpid() != myPid)  //forked process takes care of handling request
            {
                handleRequest(buffer);
            }
        }
    }
    
    void startKVStore()
    {
        //Create shared memory for kvStore
        shmem_kv = shmget(SHMEM_KV, sizeof(char *) * KVSTORE_ROWS * KVSTORE_COLUMNS, IPC_CREAT | 0666);
    
        //Attach to newly created memory
        kvStore = shmat(shmem_kv, NULL, 0);
        if (kvStore == (char *)(-1))
        {
            fprintf(stderr, "Error allocating shared memory for kvStore\n");
        }
    
        for(int i = 0; i < KVSTORE_ROWS * KVSTORE_COLUMNS; i++)
        {
            shmem_entry = shmget(SHMEM_ENTRIES + i, sizeof(char) * KV_LENGTH, IPC_CREAT | 0666);
            kvStore[i] = shmat(shmem_entry, NULL, 0);
    
            if (kvStore[i] == (char *)(-1))
            {
                fprintf(stderr, "Error allocating shared memory for kvStore entry\n");
            }
        }
    }
    
    int main()
    {
        startKVStore();
        networking();
    
        return 0;
    }
    I believe the problem resides with shared memory stuff (maybe it is not like that that I allocate memory for a string matrix, or maybe I attach in the wrong way) because I tested all functions with pthreads and everything worked flawlessly...

    PS: I also have access control (using semaphores) to prevent race conditions & other unwanted stuff. I didn't post them as, at the moment, these features are disabled (for bug-hunting reasons)
    Last edited by n0n4m3; 05-14-2015 at 09:28 AM. Reason: additional info

  4. #4
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    39,661
    > Ok, here you are the whole program (it's not compilable to keep it shot):
    And how do you expect us to test it then?
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  5. #5
    Registered User
    Join Date
    May 2015
    Posts
    6
    Sorry, I misunderstood your first answer and provided a pseudocode-like implementation to keep it short. Unfortunately I am not able to provide a code which is smaller. The following code is compilable and has the minimal feature set.

    Code:
    //System includes
    #include <stdio.h>
    #include <string.h>
    #include <stdlib.h>
    #include <arpa/inet.h>
    #include <sys/socket.h>
    #include <sys/unistd.h>
    #include <sys/shm.h>
    #include <stdbool.h>
    #include <sys/ipc.h>
    
    //Network defines
    #define BUFLEN 512  //Max length of buffer
    #define PORT 8888   //The port on which to listen for incoming data
    
    //Possible operations
    #define READ_REQUEST "[R]"
    #define WRITE_REQUEST "[W]"
    
    //Store realted
    #define KVSTORE_ROWS 200
    #define KVSTORE_COLUMNS 2
    #define KV_LENGTH 100
    
    //IDs for sared memory
    #define SHMEM_KV 123456789
    #define SHMEM_WA 1928373645
    #define SHMEM_RC 918273645
    #define SHMEM_ENTRIES 12345000
    
    //Global variables
    char **kvStore;                                                         //our key-value storage
    int shmem_rc, shmem_kv, shmem_wa;                                       //different shared memories
    int shmem_entry;
    
    //Inits the kvStore (size: KVSTORE_SIZE x KVSTORE_SIZE, key and value max length = KV_LENGTH)
    void initializeKVStore()
    {
        //Create shared memory for kvStore
        shmem_kv = shmget(SHMEM_KV, sizeof(char *) * KVSTORE_ROWS * KVSTORE_COLUMNS, IPC_CREAT | 0666);
    
        //Attach to newly created memory
        kvStore = shmat(shmem_kv, NULL, 0);
        if (kvStore == (char *)(-1))
        {
            fprintf(stderr, "Error allocating shared memory for kvStore\n");
        }
    
        for(int i = 0; i < KVSTORE_ROWS * KVSTORE_COLUMNS; i++)
        {
            shmem_entry = shmget(SHMEM_ENTRIES + i, sizeof(char) * KV_LENGTH, IPC_CREAT | 0666);
            kvStore[i] = shmat(shmem_entry, NULL, 0);
    
            if (kvStore[i] == (char *)(-1))
            {
                fprintf(stderr, "Error allocating shared memory for kvStore entry\n");
            }
        }
    }
    
    //Attach to all the shared memory
    bool attachToSharedMemory()
    {
        //kvStore related
        if((shmem_kv = shmget(SHMEM_KV, sizeof(char *) * KVSTORE_ROWS * KVSTORE_COLUMNS, 0666)) < 0)
        {
            fprintf(stderr,"Error locating kvStore segment\n");
            return false;
        }
    
        if ((kvStore = shmat(shmem_kv, NULL, 0)) < 0)
        {
            fprintf(stderr, "Error attaching kvStore shared memory\n");
            return false;
        }
    
    
        for(int i = 0; i < 400; i++)
        {
            if((shmem_entry = shmget(SHMEM_ENTRIES + i, sizeof(char) * KV_LENGTH, 0666)) < 0)
            {
                fprintf(stderr,"Error locating kvStore entry segment\n");
                return false;
            }
    
            if ((kvStore[i] = shmat(shmem_entry, NULL, 0)) < 0)
            {
                fprintf(stderr, "Error attaching kvStore shared memory\n");
                return false;
            }
        }
    
        return true;
    }
    
    //Look if the given key already exists in the kvStore. If not, return -1
    int lookupKey(char* key)
    {
        for(int i = 0; i < KVSTORE_ROWS; i++)
        {
            if(strcmp(kvStore[i], key) == 0)
            {
                fprintf(stdout, "Found key at %d. [k-v @ position %d: %s, %s]\n", i, i, kvStore[i], kvStore[i + KVSTORE_ROWS]);
                return i;
            }
        }
        return -1;
    }
    
    //Looks for an empty entry in the kvStore. If none is found, return -1
    int lookupEmptySpace()
    {
        for(int i = 0; i < KVSTORE_ROWS; i++)
        {
            if(strcmp(kvStore[i], "") == 0)
            {
                fprintf(stdout, "\tEmpty space found at %d\n", i);
                return i;
            }
        }
    
        return -1;
    }
    
    //Reads a value for a given key. If the key is not found, NULL is returned
    char* readValue(char* key)
    {
        if(!attachToSharedMemory())
        {
            fprintf(stderr, "Error attaching to shared memory\n");
            exit(1);
        }
    
        int keyLocation = lookupKey(key);
    
        //If key is present proceed with locks & magic
        if(keyLocation != -1)
        {
            return kvStore[keyLocation + KVSTORE_ROWS];
        }
    
        //No key was found
        return NULL;
    }
    
    //Writes a K-V entry in the kvStore. If the key already exists update the value
    bool writeEntry(char* key, char* value)
    {
        if(!attachToSharedMemory())
        {
            fprintf(stderr, "Error attaching to shared memory or semaphores.\n");
            exit(1);
        }
    
        int emptySpace;
        int keyLocation = lookupKey(key);
    
        if(keyLocation == -1)
        {
            //Key is not present, look for empty space
            fprintf(stdout, "\tEntry for key '%s' not found, writing to empty space\n", key);
            emptySpace = lookupEmptySpace();
    
            //Empty space found
            if(emptySpace != -1)
            {
                kvStore[emptySpace] = key;
                kvStore[emptySpace + KVSTORE_ROWS] = value;
    
                printf("\t[write] written key at %d and value at %d (%s-%s)\n", emptySpace, emptySpace + KVSTORE_ROWS, kvStore[emptySpace], kvStore[emptySpace + KVSTORE_ROWS]);
    
            }
            else
            {
                //Empty space could not be fount: returning error
                return false;
            }
        }
        else
        {
            fprintf(stdout, "Entry for key %s found at location %d, overwriting data\n", key, keyLocation);
    
            kvStore[keyLocation + KVSTORE_ROWS] = value;
        }
    
        return true;
    }
    
    //Counts the key length
    int countKeyLength(char* buffer, int msg_len)
    {
        char *valuePointer = strstr(buffer, "[V]");
        int count = 0;
    
        if(valuePointer == NULL)
        {
            count = msg_len - 3;
        }
        else
        {
            count = (valuePointer - buffer) - 3;
        }
    
        return count + 1;
    }
    
    //Counts the value length
    int countValueLength(char* buffer, int msg_len)
    {
        char *valuePointer = strstr(buffer, "[V]");
    
        if(valuePointer == NULL)
        {
            return -1;
        }
    
        return msg_len - (valuePointer - buffer) - 3 + 1;
    }
    
    //Extracts the key from the buffer
    void extractKeyFromBuffer(char* buffer, char* key, int keyLength)
    {
        for(int i = 0; i < keyLength - 1; i++)
        {
            key[i] = buffer[i + 3];
        }
    
        key[keyLength - 1] = '\0';
    }
    
    //Extracts the value from the buffer
    void extractValueFromBuffer(char* buffer, char* value, int valueLength)
    {
        char *valuePointer = strstr(buffer, "[V]");
    
        if(valuePointer == NULL)
        {
            fprintf(stderr, "Error when extracting value: no value found");
            exit(1);
        }
    
        for(int i = 0; i < valueLength; i++)
        {
            value[i] = valuePointer[i + 3];
        }
    
        value[valueLength - 1] = '\0';
    }
    
    void handleRequest(char* buf, int msg_len)
    {
        char reqType[4], reply[BUFLEN + 8];
        char *toSend = "";
        const char *stdAnwer = "Answer: ";
    
        strncpy(reqType, buf, 3);
        reqType[3] = '\0';
    
        if(strcmp(reqType, READ_REQUEST) == 0)
        {
            //Request to read entry from kvStore
            printf("\tRead request received\n");
    
            int keyLength = countKeyLength(buf, msg_len);
    
            char key[keyLength];
            extractKeyFromBuffer(buf, key, keyLength);
    
            printf("\tWilling to read value for key '%s'\n", key);
    
            if(strcmp(key, "") != 0)
            {
                toSend = readValue(key);
    
                if(toSend != NULL)
                {
                    printf("\tKey: %s\n", key);
                    printf("\tValue returned: %s\n", toSend);
                }
                else
                {
                    printf("\tKey '%s' not found\n", key);
                    toSend = "[ERROR]: Key not found!";
                }
            }
            else
            {
                toSend = "[ERROR]: No key sent";
            }
        }
        else if(strcmp(reqType, WRITE_REQUEST) == 0)
        {
            //Request to write in kvStore
            printf("\tWrite request received\n");
            int keyLength = countKeyLength(buf, msg_len);
            int valueLength = countValueLength(buf, msg_len);
    
            char key[keyLength+1], value[valueLength];
    
            extractKeyFromBuffer(buf, key, keyLength);
            extractValueFromBuffer(buf, value, valueLength);
    
            if(strcmp(key, "") != 0 && strcmp(value, "") != 0)
            {
                printf("\tKey: %s\n", key);
                printf("\tValue: %s\n", value);
    
                toSend = writeEntry(key, value) ? "Entry insterted correctly" : "[ERROR] Entry could not be inserted. kvStore full!";
            }
            else
            {
                toSend = "[ERROR] Key/Value not set";
            }
        }
        else
        {
            toSend = "Command not found";
        }
    
        for(int i = 0; i < strlen(stdAnwer) + 8; i++)
        {
            reply[i] = stdAnwer[i];
        }
    
        strcat(reply, toSend);
        strcat(reply, "\n");
    
        printf("REPLY: %s", reply);
    }
    
    int startNetworkServer()
    {
        while(1)
        {
            printf("Waiting for data...\n");
            fflush(stdout);
    
            char buf[BUFLEN];
            scanf("%s", buf);
    
            int originalPid = getpid();
    
            fork();
    
            if(getpid() != originalPid)
            {
                handleRequest(buf, strlen(buf));
    
                for(int i = 0; i < BUFLEN; i++)
                {
                    buf[i] = '\0';
                }
            }
        }
    
        return 0;
    }
    
    int main(void)
    {
        initializeKVStore();
        startNetworkServer();
    
        return 0;
    }
    What this does is start a "server" which listens for messages. The "server" handles messages formatted as follows:
    • [R]mykey => asks the server to locate the key 'mykey' and returns the value of such key
    • [W]mykey[V]myvalue => tell the server to store the k-v pair 'mykey'-'myvalue' in the storage

    The strange behaviour that I am having is that after a write process, if I want to read what I've just written in the storage, the system says that there is no such key. In addition if I add some other value it will always add everything at position 0 (as if it is always free).

  6. #6
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    39,661
    > kvStore[emptySpace] = key;
    > kvStore[emptySpace + KVSTORE_ROWS] = value;
    Try replacing these with strcpy()

    And find other places where you mistake string assignment.


    How many processes do you end up with, and which is reading the input?
    Code:
            fork();
    
            if(getpid() != originalPid)
            {
                handleRequest(buf, strlen(buf));
    
                for(int i = 0; i < BUFLEN; i++)
                {
                    buf[i] = '\0';
                }
            }
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  7. #7
    Registered User
    Join Date
    May 2015
    Posts
    6
    Seriously, I went trough those lines like a thousand times thinking about wrong indices and stuff, but never thought about strcpy() T_T. Thank you for your help & time
    For your other question: there is 1 process listening for tcp/udp connections and once it receives a connection it forks itself. The newly created process handles the request, while the original process goes back to listen for additional connections. Ideally, one is listening while there could be up to 20 processes doing IO on the shared memory. Is there a problem with this kind of solution?

  8. #8
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    39,661
    The problem you have at the moment is you're calling fork() each time around the loop.

    Each successive fork() results in a new process attempting to read stdin on the next iteration.
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  9. #9
    Registered User
    Join Date
    May 2015
    Posts
    6
    Uhm, I think I don't get what you mean. In the compilable solution given above I removed the tcp/udp stuff for trimming down the code length.
    For testing purposes I have 5 different terminals, each connected to the server with netcat, so I have 5 different stdin. At the moment things work like this:

    Code:
    while(1)
    {
        //try to receive some data, this is a blocking call
        if ((recv_len = recvfrom(s , buf, BUFLEN, 0, (struct sockaddr *) &si_other, (socklen_t*)&slen)) == -1)
        {
            die("recvfrom()");
        }
        
        printf("Received packet from %s:%d\n", inet_ntoa(si_other.sin_addr), ntohs(si_other.sin_port));
    
        struct args_struct args;
        args.buffer = buf;
        args.socket = s;
        args.slen = slen;
        args.si_other = si_other;
    
        int originalPid = getpid();
    
        fork();
    
        if(getpid() != originalPid)
        {
            handleRequest(&args, recv_len);
            memset(buf, 0, sizeof(buf);
         }
    }
    The only problem I see is that if two different queries arrive at the same time (or with very short delay) the "main" process could still be forking itself, hence losing the second request.

  10. #10
    Tweaking master Aslaville's Avatar
    Join Date
    Sep 2012
    Location
    Rogueport
    Posts
    528
    I guess you are good to go.

    The blocking call(which you had ommitted in the previous code) changes everything.

  11. #11
    and the hat of int overfl Salem's Avatar
    Join Date
    Aug 2001
    Location
    The edge of the known universe
    Posts
    39,661
    Fine, put this at the start of your while(1) loop.

    Code:
    printf("Running loop with pid=%d\n", getpid() );
    If you dance barefoot on the broken glass of undefined behaviour, you've got to expect the occasional cut.
    If at first you don't succeed, try writing your phone number on the exam paper.

  12. #12
    Registered User
    Join Date
    May 2015
    Posts
    6
    I see what you mean after forking, the child process does the handleRequest function, but after that it just continues the normal execution, starting with the while loop. I modified as follows:
    Code:
    if(fork() == 0)
    {
        handleRequest(&args, recv_len);
    
        break;
    }
    Just added a simple break inside the if clause and this did the trick of stopping child processes to do other things. Thank you for the correction
    Last edited by n0n4m3; 05-16-2015 at 06:54 AM.

Popular pages Recent additions subscribe to a feed

Similar Threads

  1. STL in Shared Memory
    By AlenDev in forum C++ Programming
    Replies: 1
    Last Post: 07-15-2010, 04:04 AM
  2. ICP shared memory
    By cavemandave in forum C Programming
    Replies: 1
    Last Post: 11-20-2007, 06:08 AM
  3. Shared Memory...
    By suzan in forum Linux Programming
    Replies: 1
    Last Post: 02-16-2006, 02:29 AM
  4. Shared Memory
    By wardej2 in forum Linux Programming
    Replies: 8
    Last Post: 10-21-2005, 07:48 AM
  5. using shared memory in c
    By flawildcat in forum C Programming
    Replies: 1
    Last Post: 04-09-2002, 12:25 PM

Tags for this Thread